{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Shallow Join of GDELT Data\n",
    "* This notebook demonstrates aggregation of data by a covering set\n",
    "* Here, we use GDELT as the data to be aggregated and a set of countries as the covering set\n",
    "* Countable fields of the GDELT data are automatically collected, totaled and averaged\n",
    "* The end result is a new Simple Feature Type with aggregate features \n",
    "* This result is then visualized in an interactive leaflet map"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Initialization:\n",
    "#### Import needed modules"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import org.locationtech.geomesa.accumulo.data.AccumuloDataStoreParams._\n",
    "import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreFactory}\n",
    "import org.geotools.data.{DataStore, DataStoreFinder}\n",
    "import org.apache.spark.{SparkConf, SparkContext}\n",
    "import org.locationtech.geomesa.compute.spark.GeoMesaSpark\n",
    "import scala.collection.JavaConversions._\n",
    "import org.apache.spark.rdd.RDD\n",
    "import org.opengis.feature.simple.SimpleFeature\n",
    "import org.apache.hadoop.conf.Configuration\n",
    "import org.geotools.data._\n",
    "import org.opengis.filter._\n",
    "import org.apache.spark.serializer.KryoRegistrator\n",
    "import org.locationtech.geomesa.features.kryo.serialization.SimpleFeatureSerializer\n",
    "import org.geotools.feature.collection.AbstractFeatureVisitor\n",
    "import org.locationtech.jts.geom.{Geometry, Point}\n",
    "import org.locationtech.geomesa.utils.geotools.{SchemaBuilder, SimpleFeatureTypes}\n",
    "import org.locationtech.geomesa.features.ScalaSimpleFeatureFactory\n",
    "import org.locationtech.geomesa.features.ScalaSimpleFeature\n",
    "import org.locationtech.geomesa.compute.spark.GeoMesaSparkKryoRegistrator\n",
    "import org.geotools.feature.simple.SimpleFeatureBuilder\n",
    "import org.locationtech.geomesa.features.ScalaSimpleFeature\n",
    "import org.geotools.filter.text.ecql.ECQL\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Initialize the data stores"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val gdeltDsParams = Map(\n",
    "         zookeepersParam.key -> \"zoo1,zoo2,zoo3\",\n",
    "         instanceIdParam.key -> \"mycloud\",\n",
    "         userParam.key -> sc.getConf.get(\"spark.credentials.ds.username\"),\n",
    "         passwordParam.key -> sc.getConf.get(\"spark.credentials.ds.password\"),\n",
    "         tableNameParam.key -> \"gdelt\")\n",
    "\n",
    "val gdeltDs = DataStoreFinder.getDataStore(gdeltDsParams).asInstanceOf[AccumuloDataStore]\n",
    "\n",
    "val countriesDsParams = Map(\n",
    "        zookeepersParam.key -> \"zoo1,zoo2,zoo3\",\n",
    "        instanceIdParam.key -> \"mycloud\",\n",
    "        userParam.key -> sc.getConf.get(\"spark.credentials.ds.username\"),\n",
    "        passwordParam.key -> sc.getConf.get(\"spark.credentials.ds.password\"),\n",
    "        tableNameParam.key -> \"countries\")\n",
    "       \n",
    "val countriesDs = DataStoreFinder.getDataStore(countriesDsParams).asInstanceOf[AccumuloDataStore]        "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Register the sfts into the Kryo Registrator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "GeoMesaSpark.register(countriesDs)\n",
    "GeoMesaSpark.register(gdeltDs)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Broadcast the registered sfts to executors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [],
   "source": [
    "val sfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft => \n",
    "    (sft.getTypeName, SimpleFeatureTypes.encodeType(sft))\n",
    "}.toArray)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Get the target data into RDDs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val gdeltRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, gdeltDsParams, new Query(\"gdelt\"))\n",
    "val countriesRdd: RDD[SimpleFeature] = GeoMesaSpark.rdd(new Configuration(), sc, countriesDsParams, new Query(\"countries\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Put the broadcasted sfts into each partition's type cache"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "gdeltRdd.foreachPartition{ iter =>              \n",
    "    sfts.value.foreach{ case (name, spec) => \n",
    "        val sft = SimpleFeatureTypes.createType(name, spec)\n",
    "        GeoMesaSparkKryoRegistrator.putType(sft)\n",
    "    }                       \n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Combine duplicate country entries."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val keyed = countriesRdd.keyBy{sf => sf.getAttribute(\"FIPS\")}\n",
    "val reduced = keyed.reduceByKey( (featureA, featureB) => featureA)\n",
    "val broadcastedCountries = sc.broadcast(reduced.values.collect)\n",
    "println(broadcastedCountries.value.size)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Map:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "** Map GDELT data to containing country **  \n",
    "** Based on the geometry of the covering set, key by the country that contains the event **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
   "val keyedData = gdeltRdd.mapPartitions { iter =>   \n",
   "    import org.locationtech.geomesa.utils.geotools.Conversions._\n",
   "\n",
   "    iter.flatMap { sf =>        \n",
   "        // Iterate over countries until a match is found\n",
   "        val it = broadcastedCountries.value.iterator\n",
   "        var container: Option[String] = None\n",
   "\n",
   "        while (it.hasNext) {\n",
   "          val cover = it.next()\n",
   "          // If the cover's polygon contains the feature,\n",
   "          // or in the case of non-point geoms, if they intersect, set the container\n",
   "          if (cover.geometry.intersects(sf.geometry)) {\n",
   "            container = Some(cover.getAttribute(key).asInstanceOf[String])\n",
   "          }\n",
   "        }\n",
   "        // return the found cover as the key\n",
   "        if (container.isDefined) {\n",
   "          Some(container.get, sf)\n",
   "        } else {\n",
   "          None\n",
   "        }\n",
   "    }                                            \n",
   "}"
  ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Get the indices and types of the attributes that can be aggregated and send them to the partitions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
  "source": [
   "val countableTypes = Seq(\"Integer\", \"Long\", \"Double\")\n",
   "val typeNames = gdeltRdd.first.getType.getTypes.toIndexedSeq.map{ft => ft.getBinding.getSimpleName.toString}\n",
   "val countableIndices = typeNames.indices.flatMap { index => \n",
   "    val featureType = typeNames(index)\n",
   "    // Only grab countable types, skipping the ID field\n",
   "    if ((countableTypes contains featureType) && index != 0) {\n",
   "        Some(index, featureType)\n",
   "    } else {\n",
   "        None\n",
   "    }\n",
   "}.toArray\n",
   "countableIndices.foreach{println}\n",
   "val countable = sc.broadcast(countableIndices)"
  ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a Simple Feature Type based on what can be aggregated\n",
    "Todo: store into sealed trait"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
   "val schemaBuilder = SchemaBuilder.builder()\n",
   "schemaBuilder.addString(\"country\")\n",
   "schemaBuilder.addMultiPolygon(\"geom\")\n",
   "schemaBuilder.addInt(\"count\")\n",
   "val featureProperties = gdeltRdd.first.getProperties.toSeq\n",
   "countableIndices.foreach { case (index, clazz) =>\n",
   "    val featureName = featureProperties.apply(index).getName\n",
   "    clazz match {\n",
   "      case \"Integer\" => schemaBuilder.addInt(s\"total_$featureName\")\n",
   "      case \"Long\" => schemaBuilder.addLong(s\"total_$featureName\")\n",
   "      case \"Double\" => schemaBuilder.addDouble(s\"total_$featureName\")\n",
   "    }\n",
   "    schemaBuilder.addDouble(s\"avg_${featureProperties.apply(index).getName}\")\n",
   "}\n",
   "val countryInfoSft = schemaBuilder.build(\"countryInformation\")"
  ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Register it with kryo and send it to executors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "GeoMesaSpark.register(Seq(countryInfoSft))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "val newSfts = sc.broadcast(GeoMesaSparkKryoRegistrator.typeCache.values.map{ sft => \n",
    "    (sft.getTypeName, SimpleFeatureTypes.encodeType(sft))\n",
    "}.toArray)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "keyedData.foreachPartition{ iter => \n",
    "    newSfts.value.foreach{ case (name, spec) => \n",
    "        val newSft = SimpleFeatureTypes.createType(name, spec)\n",
    "        GeoMesaSparkKryoRegistrator.putType(newSft)\n",
    "    }        \n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Reduce"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "** Reduce features by their country, computing sums and total counts **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
   "\n",
   "val aggregate = keyedData.reduceByKey( (featureA, featureB) => {\n",
   "    \n",
   "    import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature\n",
   "\n",
   "    val aggregateSft = GeoMesaSparkKryoRegistrator.getType(\"countryInformation\")\n",
   "     countable.value\n",
   "    val typeA = featureA.getType.getTypeName\n",
   "    val typeB = featureB.getType.getTypeName\n",
   "    val result = \n",
   "    // Case: combining two aggregate features\n",
   "    if (typeA == \"countryInformation\" && typeB == \"countryInformation\") {\n",
   "        // Combine the \"total\" properties\n",
   "        (featureA.getProperties, featureB.getProperties).zipped.foreach((propA, propB) => {\n",
   "            val name = propA.getName.toString\n",
   "            if (propA.getName.toString.startsWith(\"total_\") || propA.getName.toString == \"count\") {  \n",
   "                val sum = (propA.getValue, propB.getValue) match {\n",
   "                    case (a: Integer, b: Integer) => a + b\n",
   "                    case (a: java.lang.Long, b: java.lang.Long) => a + b\n",
   "                    case (a: java.lang.Double, b: java.lang.Double) => a + b\n",
   "                    case _ => throw new Exception(\"Couldn't match countable type.\")\n",
   "                }\n",
   "                featureA.setAttribute(propA.getName, sum)             \n",
   "            }\n",
   "        })\n",
   "        featureA\n",
   "        \n",
   "    // Case: combining two gdelt features\n",
   "    } else if (typeA != \"countryInformation\" && typeB != \"countryInformation\") {\n",
   "        \n",
   "        // Grab each feature's properties\n",
   "        val featurePropertiesA = featureA.getProperties.toSeq\n",
   "        val featurePropertiesB = featureB.getProperties.toSeq\n",
   "        // Create a new aggregate feature to hold the result\n",
   "        val featureFields = Seq(\"empty\", featureA.geometry) ++ Seq.fill(aggregateSft.getTypes.size-2)(\"0\")                        \n",
   "        val aggregateFeature = ScalaSimpleFeatureFactory.buildFeature(aggregateSft, featureFields, featureA.getID)                      \n",
   "        \n",
   "        // Loop over the countable properties and sum them for both gdelt simple features\n",
   "        countable.value.foreach { case (index, clazz) =>\n",
   "            val propA = featurePropertiesA(index)\n",
   "            val propB = featurePropertiesB(index)\n",
   "            val valA = if (propA == null) 0 else propA.getValue\n",
   "            val valB = if (propB == null) 0 else propB.getValue\n",
   "\n",
   "            // Set the total\n",
   "            if( propA != null && propB != null) {\n",
   "                val sum  = (valA, valB) match {\n",
   "                    case (a: Integer, b: Integer) => a + b\n",
   "                    case (a: java.lang.Long, b: java.lang.Long) => a + b\n",
   "                    case (a: java.lang.Double, b: java.lang.Double) => a + b\n",
   "                    case x => throw new Exception(s\"Couldn't match countable type. $x\")\n",
   "                }\n",
   "                aggregateFeature.setAttribute(s\"total_${propA.getName.toString}\", sum)\n",
   "            } else {\n",
   "                val sum = if (valA != null) valA else if (valB != null) valB else 0\n",
   "                aggregateFeature.setAttribute(s\"total_${propB.getName.toString}\", sum)\n",
   "            }\n",
   "        }\n",
   "        aggregateFeature.setAttribute(countIndex.value, new Integer(2))              \n",
   "        aggregateFeature\n",
   "    \n",
   "    // Case: combining a mix\n",
   "    } else {\n",
   "        \n",
   "        // Figure out which feature is which\n",
   "        val (aggFeature: SimpleFeature, geoFeature: SimpleFeature) = \n",
   "            if (typeA == \"countryInformation\" && typeB != \"countryInformation\") {\n",
   "                (featureA, featureB)\n",
   "            } else {\n",
   "                (featureB, featureA)\n",
   "            }\n",
   "        \n",
   "        // Loop over the aggregate feature's properties, adding on the regular feature's properties\n",
   "        aggFeature.getProperties.foreach{prop =>             \n",
   "            val name = prop.getName.toString\n",
   "            if (name.startsWith(\"total_\")) {\n",
   "                val geoProp = geoFeature.getProperty(name.substring(6))\n",
   "                if (geoProp != null) {\n",
   "                    val sum = (prop.getValue, geoProp.getValue) match {\n",
   "                        case (a: Integer, b: Integer) => a + b\n",
   "                        case (a: java.lang.Long, b: java.lang.Long) => a + b\n",
   "                        case (a: java.lang.Double, b: java.lang.Double) => a + b\n",
   "                        case _ => 0\n",
   "                    }          \n",
   "                    aggFeature.setAttribute(name, sum)\n",
   "                }\n",
   "            }\n",
   "\n",
   "        }\n",
   "        aggFeature.setAttribute(countIndex.value, aggFeature.get[Integer](\"count\") + 1)\n",
   "        aggFeature\n",
   "    } \n",
   "    \n",
   "    result\n",
   "})"
  ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Send a map of country name -> geom to the executors"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "val countryMap: scala.collection.Map[String, Geometry] = \n",
    "    reduced.map{ case (key, sf) => \n",
    "        (sf.getAttribute(\"NAME\").asInstanceOf[String] -> sf.getAttribute(\"the_geom\").asInstanceOf[Geometry])\n",
    "    }.collectAsMap\n",
    "    \n",
    "val broadcastedCountryMap = sc.broadcast(countryMap)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Compute averages and set country names and geoms"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
   "val averaged = aggregate.mapPartitions{iter =>\n",
   "    import org.locationtech.geomesa.utils.geotools.Conversions.RichSimpleFeature\n",
   "\n",
   "    iter.flatMap{ case (countryName, sf) =>\n",
   "        if (sf.getType.getTypeName == \"countryInformation\") {\n",
   "            sf.getProperties.foreach{ prop =>\n",
   "                val name = prop.getName.toString\n",
   "                if (name.startsWith(\"total_\")) {\n",
   "                    val count = sf.get[Integer](\"count\")\n",
   "                    val avg = (prop.getValue) match {\n",
   "                            case (a: Integer) => a.toDouble / count\n",
   "                            case (a: java.lang.Long) => a.toDouble / count\n",
   "                            case (a: java.lang.Double) => a / count\n",
   "                            case _ => throw new Exception(s\"couldn't match $name\")\n",
   "                    }  \n",
   "\n",
   "                    sf.setAttribute(\"avg_\" + name.substring(6), avg)\n",
   "                }\n",
   "            }\n",
   "            sf.setAttribute(\"country\", countryName)\n",
   "            sf.setDefaultGeometry(broadcastedCountryMap.value.get(countryName).get)\n",
   "\n",
   "            Some(sf)\n",
   "        } else {\n",
   "            None\n",
   "        }\n",
   "    }\n",
   "}"
  ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Export "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Add the GeoTools GeoJSON dependency"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%AddDeps org.geotools gt-geojson 14.1 --transitive --repository https:///repo.osgeo.org/repository/release"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Write Simple Features to GeoJSON"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import org.geotools.geojson.feature.FeatureJSON\n",
    "import java.io.StringWriter\n",
    "\n",
    "// Convert simple features to their GeoJson string representation\n",
    "val geoJsonWriters = averaged.mapPartitions{ iter => \n",
    "    val featureJson = new FeatureJSON()        \n",
    "    \n",
    "    val strRep = iter.map{ sf =>         \n",
    "        featureJson.toString(sf)\n",
    "    }          \n",
    "    // Join all the features on this partition\n",
    "    Iterator(strRep.mkString(\",\"))\n",
    "}\n",
    "\n",
    "// Collect these strings and joing them into a json array\n",
    "val geoJsonString = geoJsonWriters.collect.mkString(\"[\",\",\",\"]\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "#### Write the string to a file"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import java.io.File\n",
    "import java.io.FileWriter\n",
    "val jsonFile = new File(\"aggregateGdeltEarthJuly.json\")\n",
    "val fw = new FileWriter(jsonFile)\n",
    "fw.write(geoJsonString)\n",
    "fw.close"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Visualize"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Add leaflet styles and javascript"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%HTML\n",
    "<link rel=\"stylesheet\" href=\"http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.css\" />\n",
    "<script src=\"http://cdn.leafletjs.com/leaflet/v0.7.7/leaflet.js\"></script>\n",
    "<style>\n",
    ".info { padding: 6px 8px; font: 14px/18px Arial, Helvetica, sans-serif; background: white; background: rgba(255,255,255,0.8); box-shadow: 0 0 15px rgba(0,0,0,0.2); border-radius: 5px; } \n",
    ".info b { margin: 0 0 5px; color: #777; }\n",
    ".legend {\n",
    "    line-height: 18px;\n",
    "    color: #555;\n",
    "}\n",
    ".legend i {\n",
    "    width: 18px;\n",
    "    height: 18px;\n",
    "    float: left;    \n",
    "    opacity: 0.7;\n",
    "}</style>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Set up the map and populate it with the geojson data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "%%javascript\n",
    "\n",
    "(new MutationObserver(function() {\n",
    "    // START - leaflet\n",
    "    \n",
    "    // Add the base map and center around US\n",
    "    var map = L.map('map').setView([35.4746,-44.7022],3);\n",
    "    L.tileLayer(\"http://{s}.tile.osm.org/{z}/{x}/{y}.png\").addTo(map); \n",
    "    \n",
    "    // Function to set popups for each feature\n",
    "    function onEachFeature(feature, layer) {\n",
    "        layer.bindPopup(feature.properties.popupContent);        \n",
    "    }\n",
    "\n",
    "    // Colors for population levels\n",
    "    var colorRange = [\"#d73027\",\"#f46d43\",\"#fdae61\",\"#fee08b\",\"#ffffbf\",\"#d9ef8b\",\"#a6d96a\",\"#66bd63\",\"#1a9850\"];\n",
    "    var grades = [-3, -2.25, -1.5, -0.75, 0, 0.75, 1.5, 2.25, 3];\n",
    "    // Function to set popup content and fill color \n",
    "    function decorate(feature) {\n",
    "\n",
    "        // Set the popup content to be the country's properties\n",
    "        var popup = \"\";\n",
    "        for (var prop in feature.properties) {\n",
    "            popup += (prop + \": \" + feature.properties[prop] + \"<br/>\")            \n",
    "        }\n",
    "        feature.properties.popupContent = popup;    \n",
    "\n",
    "        // Set fill color based on goldstein scale\n",
    "        var fillColor = colorRange[8];        \n",
    "        for (var x = 0; x < 9; x++) {\n",
    "            if (feature.properties.avg_goldsteinScale < grades[x]) {\n",
    "                fillColor = colorRange[x]\n",
    "                break\n",
    "            }\n",
    "        }            \n",
    "\n",
    "        feature.properties.style = {\n",
    "            color: \"black\",\n",
    "            opacity: \".6\",\n",
    "            fillColor: fillColor,\n",
    "            weight: \".5\",\n",
    "            fillOpacity: \".6\"\n",
    "        }        \n",
    "    }\n",
    "\n",
    "    // Create the map legend\n",
    "    var legend = L.control({position: \"bottomright\"});\n",
    "\n",
    "    legend.onAdd = function (map) {\n",
    "\n",
    "        var div = L.DomUtil.create(\"div\", \"info legend\");\n",
    "\n",
    "        div.innerHTML+=\"<span>Avg. Goldstein Scale</span><br/>\";\n",
    "        // create a color tile for each interval\n",
    "        for (var i = 0; i < grades.length; i++) {\n",
    "            div.innerHTML +=\n",
    "                \"<i style='background:\" + colorRange[i] + \"'></i> \";\n",
    "        }\n",
    "        div.innerHTML += \"<br/>\";\n",
    "        \n",
    "        // label bounds of intervals\n",
    "        div.innerHTML += \"<i>\"+grades[0]+\"</i>\";\n",
    "        for (var i = 1; i < grades.length-1; i++) {\n",
    "            div.innerHTML +=\"<i></i>\"\n",
    "        }\n",
    "        div.innerHTML += \"<i>\"+grades[8]+\"</i>\";\n",
    "\n",
    "        return div;\n",
    "    };\n",
    "\n",
    "    legend.addTo(map);\n",
    "\n",
    "\n",
    "    var info = L.control();\n",
    "\n",
    "    info.onAdd = function (map) {\n",
    "        this._div = L.DomUtil.create(\"div\", \"info\");\n",
    "        this.update();\n",
    "        return this._div;\n",
    "    };\n",
    "\n",
    "    info.update = function (props) {\n",
    "        this._div.innerHTML = \"<b>GDELT Data by Country</b>\"\n",
    "    };\n",
    "\n",
    "    info.addTo(map);\n",
    "    // Open the geojson file and add it as a layer\n",
    "    var rawFile = new XMLHttpRequest();\n",
    "        rawFile.onreadystatechange = function () {                \n",
    "        if(rawFile.readyState === 4) {                                   \n",
    "            if(rawFile.status === 200 || rawFile.status == 0) {                \n",
    "                var allText = rawFile.response;\n",
    "                var gdeltJson = JSON.parse(allText)    \n",
    "                console.log(gdeltJson)\n",
    "                gdeltJson.forEach(decorate)\n",
    "                L.geoJson(gdeltJson, {\n",
    "                    style: function(feature) { return feature.properties.style},\n",
    "                    onEachFeature: onEachFeature\n",
    "                }).addTo(map); \n",
    "                // Css override\n",
    "                $('svg').css(\"max-width\",\"none\")\n",
    "            }\n",
    "        }\n",
    "    }        \n",
    "    rawFile.open(\"GET\", \"aggregateGdeltEarthJuly.json\", false);\n",
    "    rawFile.send()\n",
    "\n",
    "    //END - leaflet\n",
    "    this.disconnect()\n",
    "})).observe(element[0], {childList: true})\n",
    "\n",
    "\n",
    "element.append($('<div/>', { id: \"map\", width: \"100%\", height: \"512px\" }))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### The [Goldstein Scale](http://web.pdx.edu/~kinsella/jgscale.html) is a metric of how events contribute to the stability of a country. Here we see Ukraine and Central African Republic have low averages of this metric, implying that the events taking place have a negative potential impact on the region."
   ]
  }
 ],
 "metadata": {
  "celltoolbar": "Raw Cell Format",
  "kernelspec": {
   "display_name": "Spark GeoMesa 1.2.4 - Scala",
   "language": "scala",
   "name": "spark_geomesa_1.2.4_scala"
  },
  "language_info": {
   "name": "scala",
   "version": "2.10.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
